Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Taskgroup decorator #15034

Merged
merged 18 commits into from
Apr 1, 2021
Merged

Taskgroup decorator #15034

merged 18 commits into from
Apr 1, 2021

Conversation

dimberman
Copy link
Contributor

@dimberman dimberman commented Mar 26, 2021

From #13479

Added task_group decorator which can be used to create taskgroup from python callable. Inside python callable tasks can be grouped by calling tasks (python callable ) created with task decorator.

  • taskgroup decorator takes both optional and keyword argument which are passed to Constructor of TaskGroup class.

  • TaskGroup can be created with one of following syntax

@task_group
@task_group()
@task_group(group_id='group_name')
  • TaskGroup class constructor takes one mandatory argument group_id, if not given in decorator it sets group_id to python callable name.

Following is a simple example demonstrating use of taskgroup decorator grouping multiple tasks.

@task
def task_1(value):
  return f'[ Task1 {value} ]'


@task
def task_2(value):
  print(f'[ Task2 {value} ]')


@task_group
def section_1(value):
  return task_2(task_1(value))

task_group decorator utilizes existing TaskGroup context manager currently used for creating TaskGroup, which means we can create nested taskgroup by created nested callable. Following is an example demonstrating use of nested taskgroup.

@task
def task_start():
    return '[Task_start]'

@task
def task_end():
    print(f'[ Task_End ]')

@task
def task_1(value):
    return f'[ Task1 {value} ]'

@task
def task_2(value):
    print(f'[ Task2 {value} ]')

@task
def task_3(value):
    return f'[ Task3 {value} ]'

@task
def task_4(value):
    print(f'[ Task4 {value} ]')

@task_group
def section_1(value):

    @task_group
    def section_2(value2):
        return task_4(task_3(value2))

    op1 = task_2(task_1(value))
    return section_2(op1) 

Dedicated test cases for taskgroup decorator is created in file
/tests/utils/test_task_group_decorator.py

Recent changes

  • Added logic to append suffix to duplicate group_id.

closes: #11870


^ Add meaningful description above

Read the Pull Request Guidelines for more information.
In case of fundamental code change, Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in UPDATING.md.

docs/spelling_wordlist.txt Outdated Show resolved Hide resolved
airflow/decorators/task_group.py Outdated Show resolved Hide resolved
airflow/decorators/task_group.py Outdated Show resolved Hide resolved
@github-actions
Copy link

The Workflow run is cancelling this PR. It has some failed jobs matching ^Pylint$,^Static checks,^Build docs$,^Spell check docs$,^Provider packages,^Checks: Helm tests$,^Test OpenAPI*.

airflow/utils/task_group.py Outdated Show resolved Hide resolved
airflow/utils/task_group.py Outdated Show resolved Hide resolved
airflow/utils/task_group.py Outdated Show resolved Hide resolved
@dimberman dimberman force-pushed the taskgroup_decorator branch 2 times, most recently from 6ece11f to def4f1b Compare March 30, 2021 21:18
Copy link
Member

@ashb ashb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not quite sure the duplicate/reusing TG id behaviour is right yet, and code can be simplified a bit I think.

airflow/decorators/task_group.py Outdated Show resolved Hide resolved
tests/utils/test_task_group.py Show resolved Hide resolved
execution_date = pendulum.parse("20201109")
with DAG(dag_id="example_duplicate_task_group_id", start_date=execution_date, tags=["example"]) as dag:
task_group1()
task_group2()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would have expected this to fail: this is three separate task groups, all with the same name.

Calling the same TG more than once is when I would expect the __1 suffix to be added, but having actual duplicate TG ids for different groups I think should be an error case.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

    node_ids = {
        'id': None,
        'children': [
            {
                'id': 'task_group1',
                'children': [
                    {'id': 'task_group1.start_task'},
                    {'id': 'task_group1.task'},
                    {'id': 'task_group1.task__1'},
                ],
            },
            {'id': 'task_group1__1', 'children': [{'id': 'task_group1__1.task1'}]},
            {'id': 'task_group1__2', 'children': [{'id': 'task_group1__2.end_task'}]},
        ],
    }

I think this is the correct functionality. With taskgroup decorators people might be importing these from external sources and might therefore not have control over the group_id of that TaskGroup. Automatically adding a suffix can prevent failed DAGs when using these externally created resources.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmmm, given there's no way to override the name of a TG, true.

airflow/decorators/task_group.py Outdated Show resolved Hide resolved
airflow/decorators/task_group.py Outdated Show resolved Hide resolved
@ashb ashb mentioned this pull request Mar 31, 2021
VBhojawala and others added 9 commits March 31, 2021 06:58
Fixed Typo.

Added test case of multicall taskgroup

Removed reference from docs

Removed print

Example include in docs

Update airflow/utils/task_group.py

Co-authored-by: Ash Berlin-Taylor <[email protected]>
Co-authored-by: Ash Berlin-Taylor <[email protected]>
Co-authored-by: Ash Berlin-Taylor <[email protected]>
@github-actions
Copy link

The PR most likely needs to run full matrix of tests because it modifies parts of the core of Airflow. However, committers might decide to merge it quickly and take the risk. If they don't merge it quickly - please rebase it to the latest master at your convenience, or amend the last commit of the PR, and push it with --force-with-lease.

@github-actions github-actions bot added the full tests needed We need to run full set of tests for this PR to merge label Mar 31, 2021
@dimberman dimberman merged commit da897c9 into apache:master Apr 1, 2021
@dimberman dimberman deleted the taskgroup_decorator branch April 1, 2021 16:46
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
full tests needed We need to run full set of tests for this PR to merge kind:documentation
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Add @taskgroup decorator
3 participants